/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.rpc.client.local;

import io.iec.edp.caf.boot.context.CAFContext;
import io.iec.edp.caf.common.JSONSerializer;
import io.iec.edp.caf.commons.exception.CAFRuntimeException;
import io.iec.edp.caf.commons.exception.entity.DefaultExceptionProperties;
import io.iec.edp.caf.commons.exception.entity.ExceptionErrorCode;
import io.iec.edp.caf.commons.exception.ExceptionLevel;
import io.iec.edp.caf.commons.utils.InvokeService;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.core.context.ICAFContextService;
import io.iec.edp.caf.core.session.CafSession;
import io.iec.edp.caf.logging.CommonConstant;
import io.iec.edp.caf.rpc.api.entity.RpcServiceMethodDefinition;
import io.iec.edp.caf.rpc.api.event.RpcClientEventBroker;
import io.iec.edp.caf.rpc.api.event.RpcServerEventBroker;
import io.iec.edp.caf.rpc.api.support.ConstanceVarible;
import io.iec.edp.caf.rpc.api.utils.InternalSvrContainer;
import io.iec.edp.caf.rpc.api.utils.RpcAppContextUtils;
import io.iec.edp.caf.tenancy.api.ITenantService;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextHolder;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextInfo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.util.Assert;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Leon Huo
 */
@Slf4j
public class RpcLocalInvokerImpl implements RpcLocalInvoker{
    private static final String HTTP_THREAD_NAME="http-nio-";
    private static final String RPC_THREAD_NAME="RPC-Server-";
    //参考jianshu.com/p/f343782f19fc，解决线程死锁问题
    private static final ThreadPoolExecutor executor =
            //todo 这里只有32个线程数，感觉会有风险，最好能压测或者允许yaml可配置
            new ThreadPoolExecutor(1, 32, 10, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(1),
                    new ThreadFactory() {
                        private AtomicInteger id = new AtomicInteger(0);
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setName(RPC_THREAD_NAME + id.addAndGet(1));
                            return thread;
                        }
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
    private RpcClientEventBroker clientEventBroker;

    private RpcServerEventBroker serverEventBroker;

    private ICAFContextService cafContextService;

    public RpcLocalInvokerImpl(RpcClientEventBroker clientEventBroker, RpcServerEventBroker serverEventBroker, ICAFContextService cafContextService) {
        this.clientEventBroker = clientEventBroker;
        this.serverEventBroker = serverEventBroker;
        this.cafContextService = cafContextService;
    }

    @Override
    public <T> T invokeLocalService(Class<T> clazz, String serviceId,String serviceUnitName, HashMap<String, Object> parameters, RpcServiceMethodDefinition rpcServiceMethodDefinition, Integer tenantId,HashMap<String, String> eventContext) throws IllegalAccessException, InvocationTargetException {
        //接口类名
        String interfaceString = rpcServiceMethodDefinition.getParentDefinition().getClassName();

        eventContext.put(ConstanceVarible.CURRENT_SERVICE_INTERFACE, interfaceString);

        //local context
        HashMap<String, Object> localContext = new HashMap<>();
        //执行前事件
        this.clientEventBroker.firePreRpcInvokeEvent(eventContext, parameters, localContext);

        //触发RPC执行
        T result;
        HashMap<String, String> serverContextDict;
        try {
            HashMap<String, Object> serverThread = fireRpcInvocation(clazz,serviceUnitName, eventContext, interfaceString,
                    rpcServiceMethodDefinition, serviceId, parameters, tenantId);
            result = (T) serverThread.get("result");
            serverContextDict = (HashMap<String, String>) serverThread.get("context");
        } catch (Exception e) {
            this.clientEventBroker.fireExceptionRpcInvokeEvent(eventContext, parameters, localContext, e);
            throw e;
        }
        //执行后事件
        this.clientEventBroker.firePostRpcInvokeEvent(serverContextDict, result, localContext);

        //返回
        return result;
    }

    @Override
    public InputStream invokeLocalServiceStream(String serviceId, String version,String serviceUnitName, HashMap<String, Object> parameters, RpcServiceMethodDefinition rpcServiceMethodDefinition, Integer tenantId, HashMap<String, String> eventContext) throws IllegalAccessException, InvocationTargetException {
        //接口类名
        String interfaceString = rpcServiceMethodDefinition.getParentDefinition().getClassName();
        eventContext.put(ConstanceVarible.CURRENT_SERVICE_INTERFACE, interfaceString);

        //local context
        HashMap<String, Object> localContext = new HashMap<>();
        //执行前事件
        this.clientEventBroker.firePreRpcInvokeEvent(eventContext, parameters, localContext);

        //触发RPC执行
        InputStream result;
        HashMap<String, String> serverContextDict;
        try {
            var newParams = getStreamParameters(null,parameters);
            HashMap<String, Object> serverThread = fireRpcInvocation(InputStream.class,serviceUnitName, eventContext, interfaceString,
                    rpcServiceMethodDefinition, serviceId, newParams, tenantId);
            result = (InputStream) serverThread.get("result");
            serverContextDict = (HashMap<String, String>) serverThread.get("context");
        } catch (Exception e) {
            this.clientEventBroker.fireExceptionRpcInvokeEvent(eventContext, parameters, localContext, e);
            throw e;
        }
        //执行后事件
        this.clientEventBroker.firePostRpcInvokeEvent(serverContextDict, result, localContext);

        //返回
        return result;
    }

    @Override
    public <T> T invokeLocalServiceStream(InputStream stream, Class<T> clazz, String serviceId, String version, String serviceUnitName, HashMap<String, Object> parameters, RpcServiceMethodDefinition rpcServiceMethodDefinition, Integer tenantId, HashMap<String, String> eventContext) throws IllegalAccessException, InvocationTargetException {
        //接口类名
        String interfaceString = rpcServiceMethodDefinition.getParentDefinition().getClassName();

        eventContext.put(ConstanceVarible.CURRENT_SERVICE_INTERFACE, interfaceString);

        //local context
        HashMap<String, Object> localContext = new HashMap<>();
        //执行前事件
        this.clientEventBroker.firePreRpcInvokeEvent(eventContext, parameters, localContext);

        //触发RPC执行
        T result;
        HashMap<String, String> serverContextDict;
        try {
            var newParams = getStreamParameters(stream,parameters);
            HashMap<String, Object> serverThread = fireRpcInvocation(clazz,serviceUnitName, eventContext, interfaceString,
                    rpcServiceMethodDefinition, serviceId, newParams, tenantId);
            result = (T) serverThread.get("result");
            serverContextDict = (HashMap<String, String>) serverThread.get("context");
        } catch (Exception e) {
            this.clientEventBroker.fireExceptionRpcInvokeEvent(eventContext, parameters, localContext, e);
            throw e;
        }
        //执行后事件
        this.clientEventBroker.firePostRpcInvokeEvent(serverContextDict, result, localContext);

        //返回
        return result;
    }

    private HashMap<String, Object> getStreamParameters(InputStream stream,HashMap<String, Object> parameters){
        HashMap<String, Object> newParams = new HashMap<>();
        if(stream!=null){
            newParams.put(ConstanceVarible.RPC_INPUT_STREAM,stream);

        }

        for(var p:parameters.keySet()){
            newParams.put(p,parameters.get(p));
        }

        return newParams;
    }

    /**
     * 触发RPC本地执行方法
     *
     * @param <T>                        泛型
     * @param serviceUnitName            服务单元
     * @param eventContext               时间上下文
     * @param interfaceString            接口类
     * @param rpcServiceMethodDefinition 服务定义
     * @param serviceId                  服务id
     * @param parameters                 参数
     * @param tenantId
     * @return
     * @throws InvocationTargetException
     * @throws IllegalAccessException
     */
    private <T> HashMap<String, Object> fireRpcInvocation(Class<T> clazz,String serviceUnitName, HashMap<String, String> eventContext, String interfaceString,
                                                          RpcServiceMethodDefinition rpcServiceMethodDefinition, String serviceId, HashMap<String, Object> parameters, Integer tenantId) throws InvocationTargetException, IllegalAccessException {
        //取一下Session
        CafSession session = this.cafContextService.getCurrentSession();
        //获取实例对象及要执行的method
        String methodName = rpcServiceMethodDefinition.getId().substring(rpcServiceMethodDefinition.getId().lastIndexOf('.') + 1);

        //是否应该线程隔离
        boolean shouldIsolated = isolated(tenantId,serviceUnitName);//tenantId!=null && tenantId!= this.cafContextService.getTenantId();
        HashMap<String, Object> result;
        if(shouldIsolated){
            try {
                Callable callable = () -> {
                    HashMap<String, Object> map=null;
                    try{
                        prepareRpcServerContext(true, session, tenantId, serviceUnitName);
                        map = doInvoke(serviceId, serviceUnitName, eventContext, parameters, tenantId, interfaceString, methodName);
                    }catch (Throwable throwable){
                        throw throwable;
                    }finally {
                        clearRpcServerContext(true, tenantId, "");
                        return map;
                    }
                };
                FutureTask<HashMap<String, Object>> futureTask = new FutureTask<>(callable);
                //如当前线程已经为线程池，则为避免死锁，启动非线程池线程
                if(Thread.currentThread().getName().startsWith(HTTP_THREAD_NAME)){
                    executor.execute(futureTask);
                }else{
                    Thread thread = new Thread(futureTask);
                    thread.start();
                }
                result = futureTask.get();
            }catch (Exception e){
                throw new RuntimeException(e.getMessage(),e);
            }
        }else{
            //提前保存客户端su
            String clientSU = this.cafContextService.getCurrentSU();
            try{
                prepareRpcServerContext(false,session,tenantId,serviceUnitName);
                result = doInvoke(serviceId,serviceUnitName,eventContext,parameters,tenantId,interfaceString,methodName);
            }catch (Throwable throwable){
                throw throwable;
            }finally {
                clearRpcServerContext(false,tenantId,clientSU);
            }

        }
        return result;
    }

    private void prepareRpcServerContext(Boolean isolated, CafSession session, Integer tenantId, String serviceUnitName){
        if(isolated) {
            //恢复CafSession,内置实现
//            this.cafContextService.setCurrentSession(session);
            this.cafContextService.setCurrentThreadPoolSession(session);

            //设置租户
            if (tenantId != null && tenantId!= this.cafContextService.getTenantId()) {
                RequestTenantContextHolder.set(new RequestTenantContextInfo(tenantId));
            }
        }
        //AsyncLocal变量的清理及重新构造事件
        this.cafContextService.setCurrentSU(serviceUnitName);
    }

    private void clearRpcServerContext(Boolean isolated, Integer tenantId, String serviceUnitName){
        if(isolated) {
            //恢复CafSession,内置实现
//            this.cafContextService.clearSession();
            this.cafContextService.clearCurrentThreadPoolSession();

            //设置租户
            if (tenantId != null) {
                RequestTenantContextHolder.restore();
            }
        }
        //AsyncLocal变量的清理及重新构造事件
        this.cafContextService.setCurrentSU(serviceUnitName);
    }

    private boolean isolated(Integer tenantId,String su){
        //tenant not same,neet isolated;
        var check = tenantId!=null && tenantId!= this.cafContextService.getTenantId();
        ITenantService tenant = SpringBeanUtils.getBean(ITenantService.class);
        var defaultAppInstance = "pg01";
        if(tenantId==null || su == null || su.length()==0){
            log.info("tenant and su is null can not sure isolated，return default false");
            return false;
        }else{
            var curSuDB = tenant.getDBConnInfo(tenantId,defaultAppInstance, CAFContext.current.getCurrentSU());
            var targetSuDB = tenant.getDBConnInfo(tenantId,defaultAppInstance,su);
            // current su and target su dbconnection（not same database） not same need isolated
            var check2 = curSuDB!=null && targetSuDB!=null && curSuDB.getId()!=targetSuDB.getId();
            return check || check2;

        }
    }

    @SneakyThrows
    private <T> HashMap<String, Object> doInvoke(String serviceId, String serviceUnitName, HashMap<String, String> eventContext,
                                                 HashMap<String, Object> parameters, Integer tenantId, String interfaceClass,
                                                 String methodName) throws InvocationTargetException, IllegalAccessException{
        //获取实例对象及要执行的method
        Method method;
        Object instance = null;
        try {
            //注意此处需要传入线程持有的classLoader 否则对当前类的classLoader(LaunchedURLClassLoader)来说 SU中的Class将是不可见的
            Class iType = InvokeService.getClass(interfaceClass);
            try {
                instance = SpringBeanUtils.getBean(iType);
            } catch (NoSuchBeanDefinitionException e) {
                log.info(e.getMessage(),e);
                instance = InternalSvrContainer.getService(iType);
            }

            method = RpcAppContextUtils.getMethodByName(iType, methodName);
            Assert.isTrue(method != null && instance != null);
        } catch(Exception e) {
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.serviceNotFound,
                    new String[]{serviceId},
                    null, ExceptionLevel.Error, false);
        }

        //执行方法
        T result;
        HashMap<String, Object> localDict = new HashMap<>();
        try {
            //server端的执行前事件
            localDict.put(CommonConstant.LOG_MSU_ID, serviceUnitName);
            this.serverEventBroker.firePreRpcInvokeEvent(eventContext, parameters, localDict);
            Object[] paramValueArray = parameters.values().toArray();

            if(log.isDebugEnabled()){
                log.debug("rpc local invoke params:"+ JSONSerializer.serialize(parameters));
                log.debug("rpc local invoke method:"+ method.getName());
            }


            var start = System.currentTimeMillis();
            result = (T) method.invoke(instance, paramValueArray);
            var end = System.currentTimeMillis();
            log.info("rpc local invoke target method time：{} ms",end-start);

            //调用后事件
            this.serverEventBroker.firePostRpcInvokeEvent(eventContext, result, localDict);
        } catch (Exception e) {
            Throwable t;
            log.debug("rpc local invoke exception type: {}",e.getClass().getName());
            if (e instanceof InvocationTargetException) {
                log.debug("rpc local invoke invocationTargetException type:"+((InvocationTargetException) e).getTargetException().getClass().getName(),((InvocationTargetException) e).getTargetException());
                t = ((InvocationTargetException) e).getTargetException();
            } else {
                t = e;
            }
            this.serverEventBroker.fireExceptionRpcInvokeEvent(eventContext, parameters, localDict,  e);
            throw t;
        }

        //返回
        HashMap<String, Object> resultHash = new HashMap<>();
        resultHash.put("result", result);
        resultHash.put("context", eventContext);
        return resultHash;
    }
}
